package com.cloudansys.core.flink.function;

import com.cloudansys.config.DefaultConfig;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.MultiDataEntity;
import com.cloudansys.core.util.NumUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
public class WYProcessor extends ProcessFunction<List<MultiDataEntity>, List<MultiDataEntity>> {

    private static Jedis jedis;
    private static Map<Integer, Double> lastResetValues;
    private Long lastTime = 0L;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        jedis = DefaultConfig.getJedis();
        lastResetValues = initLastResetValue();
    }

    /**
     * 提升高度测量传感器原始高度：2.25米
     * 轮毂原始高度：84.44米
     * <p>
     * 提升高度计算逻辑：提升高度监测传感器，每次给的数据是传感器的高度，
     * 需要存储上次传感器的高度，此次传感器的高度减去所存储的上次传感器的高度才为塔筒的提升高度
     * （计算时每次需要减去传感器的原始高度）
     * <p>
     * 计算当前轮毂实际高度，并把当前轮毂实际高度实时更新到 redis 中，key 为 ts
     * 公式：【当前轮毂实际高度】 = 【轮毂提升高度】 + 【当前轮毂实际高度】[废]
     * 公式：【当前轮毂实际高度】 = 【轮毂原始高度】 + 【当前位移传感器高度】
     */
    @Override
    public void processElement(List<MultiDataEntity> elements, Context ctx, Collector<List<MultiDataEntity>> out) throws Exception {
        // 是否更新 redis 中存储的高度信息
        boolean flag = true;
        // 打印当前轮毂实际高度
        printCurrentHeight();
        // 高度传感器的原始高度
        String redisTsb = jedis.get(Const.TSB_KEY);
        if (redisTsb == null) {
            log.error("高度传感器原始高度获取失败！");
            return;
        }
        double tsb = NumUtil.formatDouble(Double.parseDouble(redisTsb));
        List<MultiDataEntity> resList = new ArrayList<>();
        String ts = jedis.get(Const.TS_REDIS);
        for (MultiDataEntity element : elements) {
            String projectId = element.getProjectId();
            String serialCode = element.getSerialCode();
            if (Const.TS_SC.equals(serialCode)) {
                Double[] values = element.getValues();
                // 此次高度传感器的高度
                double thisTsHeight = values[1];
                log.info("====【thisTsHeight: {}】====", thisTsHeight);
                // 提升高度（此次传感器的高度减去传感器的原始高度为提升高度）
                double liftHeight = NumUtil.formatDouble((thisTsHeight - tsb) / 1000);
                log.info("====【liftHeight: {}】====", liftHeight);
                // 当前高度
                double currentHeight = NumUtil.formatDouble(Const.TS_BASE + liftHeight);
//                log.info("====【currentHeight: {}】====", currentHeight);
                // 提升第一阶段，如果提升高度大于127.21m 就按127.21m算；提升第二阶段，如果提升高度大于 170m 就按 170m 算
                if ("29".equals(projectId) && currentHeight > 127.21) {
                    flag = false;
                    currentHeight = 127.21;
                    setHeightCache(projectId, String.valueOf(currentHeight));

                } else if ("37".equals(projectId) && currentHeight > 170) {
                    flag = false;
                    currentHeight = 170;
                    setHeightCache(projectId, String.valueOf(currentHeight));
                }
                if (flag) {
                    setHeightCache(projectId, String.valueOf(currentHeight));
                }

                // 第二个指标为位移值，即轮毂提升高度，单位为 mm；把第三个备用指标设置成【当前轮毂实际高度】，单位为 m
                values[2] = currentHeight;
                element.setValues(values);
            }
            // 把平整度原始数据减去复位值
            int sc = Integer.parseInt(serialCode);
            if (sc >= 36 && sc <= 53) {
                // 获取 redis 中存储的该传感器的复位值
                double resetValue = getResetValue(serialCode);
                // 页面展示数据 原始数据减去复位的数值
                Double value = NumUtil.formatDouble(element.getValues()[1] - resetValue);
                // 把第一个指标设置成 减去复位的数值
                element.getValues()[0] = value;
                // 把第三个指标设置成【当前轮毂实际高度】
                element.getValues()[2] = NumUtil.formatDouble(Double.parseDouble(ts));
                // 把第四个指标设置成复位值
                element.getValues()[3] = resetValue;
                // 判断是否进行了复位操作（有复位把第五个指标设置成1，默认0），即此次的复位值和上次的复位值不一样
                Double lastResetValue = lastResetValues.get(sc);
                if (resetValue != lastResetValue) {
                    element.getValues()[4] = 1.0;
                    // 同时更新上次的复位值
                    lastResetValues.put(sc, resetValue);
                } else {
                    element.getValues()[4] = 0.0;
                }
            }
            resList.add(element);
        }
        out.collect(resList);
    }

    /**
     * 存储提升高度信息
     */
    private void setHeightCache(String projectId, String currentHeight) {
        // 把【当前提升阶段】和【当前轮毂高度】实时更新到 redis 中
        jedis.set(Const.PID, projectId);
        jedis.set(Const.TS_REDIS, currentHeight);
    }

    /**
     * 打印振动阈值
     */
    private void printCurrentHeight() {
        long thisTime = System.currentTimeMillis();
        long diffTime = thisTime - lastTime;
        if (diffTime > 1000) {
            String currentHeight = jedis.get(Const.TS_REDIS);
            log.info("====【currentHeight: {}】====", currentHeight);
            lastTime = System.currentTimeMillis();
        }
    }

    /**
     * 初始化上次的复位值
     */
    private Map<Integer, Double> initLastResetValue() {
        Map<Integer, Double> values = new HashMap<>();
        for (int sc = 36; sc <= 56; sc++) {
            double resetValue = Double.parseDouble(jedis.get(Const.EVEN_RESET + sc));
            values.put(sc, resetValue);
        }
        return values;
    }

    /**
     * 获取复位值，默认值 0
     */
    private double getResetValue(String serialCode) {
        String redisKey = Const.EVEN_RESET + serialCode;
        String resetValue = jedis.get(redisKey);
        return Double.parseDouble(resetValue);
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (jedis != null) {
            jedis.close();
        }
    }

}
